import numpy as np
import pandas as pd
from itertools import cycle
from scipy import interp
# sklearn
from sklearn import metrics
from sklearn.model_selection import train_test_split
from sklearn import datasets
from sklearn.preprocessing import StandardScaler
# TensorFlow
import tensorflow as tf
from tensorflow.estimator import LinearClassifier
# Visualisation libraries
## IPython
from IPython.display import clear_output
## Text
from colorama import Fore, Back, Style
from IPython.display import Image, display, Markdown, Latex
## seaborn
import seaborn as sns
sns.set_context('paper', rc={'font.size':12,'axes.titlesize':14,'axes.labelsize':12})
sns.set_style('white')
## matplotlib
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.patches import Ellipse, Polygon
import matplotlib.gridspec as gridspec
import matplotlib.colors
from pylab import rcParams
plt.style.use('seaborn-whitegrid')
plt.rcParams['figure.figsize'] = 14, 8
plt.rcParams['axes.labelsize'] = 14
plt.rcParams['xtick.labelsize'] = 12
plt.rcParams['ytick.labelsize'] = 12
plt.rcParams['text.color'] = 'k'
%matplotlib inline
## plotly
from plotly.offline import init_notebook_mode, iplot
import plotly.graph_objs as go
import plotly.offline as py
from plotly.subplots import make_subplots
import plotly.express as px
%config InlineBackend.figure_format = 'retina'
import warnings
warnings.filterwarnings("ignore")

In this article, we demonstrate solving a classification problem in TensorFlow using Estimators using the UCI ML Wine recognition dataset. This dataset also can be accessed via the scikit-learn datasets.
These data are the results of a chemical analysis of wines grown in the same region in Italy but derived from three different cultivars. The analysis determined the quantities of 13 constituents found in each of the three types of wines.
Data = datasets.load_wine(as_frame=True)
Target = 'Class'
Labels = [x.replace('_',' ').title() for x in Data['target_names']]
Data = pd.concat([Data['data'], pd.Series(Data['target']).to_frame(Target)], axis = 1)
Temp = ['Alcohol', 'Malic Acid', 'Ash', 'Alcalinity of Ash', 'Magnesium', 'Total Phenols', 'Flavanoids',
'Nonflavanoid phenols', 'Proanthocyanins', 'Color Intensity', 'Hue', 'OD280/OD315 of Diluted Wines', 'Proline', 'Class']
Attributes = {}
for k, v in zip(Data.columns, Temp):
Attributes[k] = v
del k, v, Temp
display(Data.rename(columns = Attributes).head())
def Data_info(Inp, Only_NaN = False):
Out = Inp.dtypes.to_frame(name='Data Type').sort_values(by=['Data Type'])
Out = Out.join(Inp.isnull().sum().to_frame(name = 'Number of NaN Values'), how='outer')
Out ['Size'] = Inp.shape[0]
Out['Percentage'] = np.round(100*(Out['Number of NaN Values']/Inp.shape[0]),2)
Out.index.name = 'Features'
if Only_NaN:
Out = Out.loc[Out['Number of NaN Values']>0]
return Out
Data_info(Data)
Let's take a look at the variance of the features.
Fig, ax = plt.subplots(figsize=(17,12))
Temp = Data.drop(columns = [Target]).var().sort_values(ascending = False).to_frame(name= 'Variance').round(2).T
Temp.columns = [Attributes[x] for x in Temp.columns]
_ = sns.heatmap(Temp, ax=ax, annot=True, square=True, cmap =sns.color_palette("OrRd", 20),
linewidths = 0.8, vmin=0, vmax=Temp.max(axis =1)[0], annot_kws={"size": 12},
cbar_kws={'label': 'Feature Variance', "aspect":40, "shrink": .4, "orientation": "horizontal"})
lb = [x.replace(' ','\n').replace('\nof\n',' of\n') for x in [item.get_text() for item in ax.get_xticklabels()]]
_ = ax.set_xticklabels(lb)
_ = ax.set_yticklabels('')
Furthermore, we would like to standardize features by removing the mean and scaling to unit variance.
# Scaling
Temp = Data.drop(columns = Target).columns.tolist()
scaler = StandardScaler()
_ = scaler.fit(Data[Temp])
Data[Temp] = scaler.transform(Data[Temp])
# Variance Plot
Fig, ax = plt.subplots(figsize=(17,12))
Temp = Data.drop(columns = [Target]).var().sort_values(ascending = False).to_frame(name= 'Variance').round(2).T
Temp.columns = [Attributes[x] for x in Temp.columns]
_ = sns.heatmap(Temp, ax=ax, annot=True, square=True, cmap =sns.color_palette('Greens'),
linewidths = 0.8, vmin=0, vmax=Temp.max(axis =1)[0], annot_kws={"size": 12},
cbar_kws={'label': 'Feature Variance', "aspect":40, "shrink": .4, "orientation": "horizontal"})
lb = [x.replace(' ','\n').replace('\nof\n',' of\n') for x in [item.get_text() for item in ax.get_xticklabels()]]
_ = ax.set_xticklabels(lb)
_ = ax.set_yticklabels('')
X = Data.copy()
y = Data.pop(Target)
Test_Size = 0.3
def Sets_Plot(Data, Test_Size):
Temp = pd.DataFrame({'Set': ['Train', 'Test'],
'Number of Instances':[int(Data.shape[0]*(1-Test_Size)), int(Data.shape[0]*Test_Size)]})
Temp['Percentage'] = np.round(100* Temp['Number of Instances'].values /Temp['Number of Instances'].sum(), 2)
fig = px.bar(Temp, y= ['',''], x= 'Number of Instances', orientation='h', color = 'Set', text = 'Percentage',
color_discrete_sequence = ['PaleGreen', 'LightBlue'], height = 180)
fig.update_layout(plot_bgcolor= 'white', legend_orientation='h', legend=dict(x=0, y=1.7),
xaxis = dict(tickmode = 'array', tickvals = [0, Data.shape[0]], ticktext = ['','']))
fig.update_traces(marker_line_color= 'Black', marker_line_width=1.5, opacity=1)
fig.update_traces(texttemplate='%{text:.2}% ', textposition='inside')
fig.update_xaxes(title_text=None, range=[0, Data.shape[0]])
fig.update_yaxes(title_text=None)
fig.show()
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size= Test_Size, random_state=42)
display(pd.DataFrame(data={'Set':['X_train','X_test','y_train','y_test'],
'Shape':[X_train.shape, X_test.shape, y_train.shape, y_test.shape]}).set_index('Set').T)
Sets_Plot(Data, Test_Size)
The input function specifies how data is converted to a tf.data.Dataset that feeds the input pipeline in a streaming fashion. Moreover, an input function is a function that returns a tf.data.Dataset object which outputs the following two-element tuple:
def input_fn(features, labels, training=True, batch_size=256):
"""An input function for training or evaluating"""
# Convert the inputs to a Dataset.
dataset = tf.data.Dataset.from_tensor_slices((dict(features), labels))
# Shuffle and repeat if you are in training mode.
if training:
dataset = dataset.shuffle(1000).repeat()
return dataset.batch(batch_size)
Moreover, an estimator model consists of two main parts, feature columns, and a numeric vector. Feature columns provide explanations for the input numeric vector. The following function separates categorical and numerical columns (features)and returns a descriptive list of feature columns.
def Feat_Columns(Inp):
Temp = Inp.dtypes.reset_index(drop = False)
Temp.columns = ['Features', 'Data Type']
Temp['Data Type'] = Temp['Data Type'].astype(str)
# Numeric_Columns
Numeric_Columns = Temp.loc[Temp['Data Type'].isin(['int64', 'int32', 'float64', 'float32']),'Features'].tolist()
# Categorical_Columns
Categorical_Columns = Temp.loc[Temp['Data Type'] == 'object','Features'].tolist()
# Feature Columns
feature_columns = []
if len(Categorical_Columns)>0:
for feature_name in Categorical_Columns:
vocabulary = Inp[feature_name].unique()
feature_columns.append(tf.feature_column.categorical_column_with_vocabulary_list(feature_name, vocabulary))
if len(Numeric_Columns)>0:
for feature_name in Numeric_Columns:
feature_columns.append(tf.feature_column.numeric_column(feature_name))
return feature_columns
my_feature_columns = Feat_Columns(X)
tf.keras.backend.clear_session()
IT = int(5e3)
classifier = tf.estimator.DNNClassifier(feature_columns=my_feature_columns,
# Hidden layers.
hidden_units= [100, 12, 3],
# The model must choose between 3 classes.
n_classes=len(Labels))
classifier.train(input_fn=lambda: input_fn(X_train, y_train, training=True), max_steps = IT)
result = classifier.evaluate(input_fn=lambda: input_fn(X_test, y_test, training=False))
clear_output()
display(pd.DataFrame(result, index = ['']).round(4))
pred_dicts = list(classifier.predict(input_fn=lambda: input_fn(X_test, y_test, training=False)))
clear_output()
probs = np.array([pred['probabilities'] for pred in pred_dicts])
def ROC_Curve(y_test, probs, n_classes, FS = 7, ax = False, pad = 0.01):
# converting y_test to categorical
y_test_cat = tf.keras.utils.to_categorical(y_test, num_classes=n_classes, dtype='float32')
# Compute ROC curve and ROC area for each class
fpr = dict()
tpr = dict()
roc_auc = dict()
for i in range(n_classes):
fpr[i], tpr[i], _ = metrics.roc_curve(y_test_cat[:, i], probs[:, i])
roc_auc[i] = metrics.auc(fpr[i], tpr[i])
# Compute micro-average ROC curve and ROC area
fpr["micro"], tpr["micro"], _ = metrics.roc_curve(y_test_cat.ravel(), probs.ravel())
roc_auc["micro"] = metrics.auc(fpr["micro"], tpr["micro"])
# First aggregate all false positive rates
all_fpr = np.unique(np.concatenate([fpr[i] for i in range(n_classes)]))
# Then interpolate all ROC curves at this points
mean_tpr = np.zeros_like(all_fpr)
for i in range(n_classes):
mean_tpr += interp(all_fpr, fpr[i], tpr[i])
# Finally average it and compute AUC
mean_tpr /= n_classes
fpr["macro"] = all_fpr
tpr["macro"] = mean_tpr
roc_auc["macro"] = metrics.auc(fpr["macro"], tpr["macro"])
# Plot all ROC curves
if ax == False:
fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(FS, FS))
_ = ax.plot(fpr["micro"], tpr["micro"], label='micro-average ROC curve (area = {0:0.2f})'.format(roc_auc["micro"]),
color='deeppink', linestyle=':', linewidth=4)
_ = ax.plot(fpr["macro"], tpr["macro"], label='macro-average ROC curve (area = {0:0.2f})'.format(roc_auc["macro"]),
color='navy', linestyle=':', linewidth=4)
colors = cycle(['Aqua', 'DarkOrange', 'CornflowerBlue'])
for i, color in zip(range(n_classes), colors):
_ = ax.plot(fpr[i], tpr[i], color=color, lw=2, label='ROC curve of class {0} (area = {1:0.2f})'.format(i, roc_auc[i]))
_ = ax.plot([0, 1], [0, 1], 'k--', lw=2)
_ = ax.set_xlabel('False Positive Rate (FPR)')
_ = ax.set_ylabel('True Positive Rate (TPR)')
_ = ax.set_title('Receiver Operating Characteristic (ROC) Curves')
_ = ax.legend(loc="lower right", fontsize = 12)
_ = ax.set_xlim([-pad,1+pad])
_ = ax.set_ylim([-pad,1+pad])
# end
ROC_Curve(y_test, probs, n_classes = len(Labels), FS = 8)
# Test set
y_pred = np.argmax(probs, axis = 1).reshape(-1,1)
Confusion_Matrix = metrics.confusion_matrix(y_test, y_pred)
Results = pd.DataFrame(metrics.classification_report(y_test, y_pred, target_names=Labels, output_dict=True)).T
display(Results.round(2))
fig, ax = plt.subplots(1, 2, figsize=(14, 5))
fig.suptitle('Test Set', fontsize = 18)
_ = sns.heatmap(Confusion_Matrix, annot=True, annot_kws={"size": 14}, cmap="Blues", ax = ax[0],
linewidths = 0.2, cbar_kws={"shrink": 1})
_ = ax[0].set_xlabel('Predicted labels')
_ = ax[0].set_ylabel('True labels');
_ = ax[0].set_title('Confusion Matrix');
_ = ax[0].xaxis.set_ticklabels(Labels)
_ = ax[0].yaxis.set_ticklabels(Labels)
Confusion_Matrix = Confusion_Matrix.astype('float') / Confusion_Matrix.sum(axis=1)[:, np.newaxis]
_ = sns.heatmap(Confusion_Matrix, annot=True, annot_kws={"size": 14}, cmap="Greens", ax = ax[1],
linewidths = 0.2, vmin=0, vmax=1, cbar_kws={"shrink": 1})
_ = ax[1].set_xlabel('Predicted labels')
_ = ax[1].set_ylabel('True labels');
_ = ax[1].set_title('Normalized Confusion Matrix');
_ = ax[1].xaxis.set_ticklabels(Labels)
_ = ax[1].yaxis.set_ticklabels(Labels)
In this classification, the learning rate of your optimizer changes over time.
tf.keras.backend.clear_session()
classifier = tf.estimator.DNNClassifier(feature_columns=my_feature_columns,
# Hidden layers.
hidden_units= [1024, 512, 256],
# The model must choose between 3 classes.
n_classes=len(Labels),
optimizer=lambda: tf.keras.optimizers.Adam(learning_rate=tf.compat.v1.train.exponential_decay(learning_rate=0.1,
global_step=tf.compat.v1.train.get_global_step(), decay_steps=IT,decay_rate=0.96)))
#
classifier.train(input_fn=lambda: input_fn(X_train, y_train, training=True), max_steps= IT)
result = classifier.evaluate(input_fn=lambda: input_fn(X_test, y_test, training=False))
clear_output()
display(pd.DataFrame(result, index = ['']).round(4))
pred_dicts = list(classifier.predict(input_fn=lambda: input_fn(X_test, y_test, training=False)))
clear_output()
n_classes = len(Labels)
probs = np.array([pred['probabilities'] for pred in pred_dicts])
ROC_Curve(y_test, probs, n_classes = len(Labels), FS = 8)
# Test set
y_pred = np.argmax(probs, axis = 1).reshape(-1,1)
Confusion_Matrix = metrics.confusion_matrix(y_test, y_pred)
Results = pd.DataFrame(metrics.classification_report(y_test, y_pred, target_names=Labels, output_dict=True)).T
display(Results.round(2))
fig, ax = plt.subplots(1, 2, figsize=(14, 5))
fig.suptitle('Test Set', fontsize = 18)
_ = sns.heatmap(Confusion_Matrix, annot=True, annot_kws={"size": 14}, cmap="Blues", ax = ax[0],
linewidths = 0.2, cbar_kws={"shrink": 1})
_ = ax[0].set_xlabel('Predicted labels')
_ = ax[0].set_ylabel('True labels');
_ = ax[0].set_title('Confusion Matrix');
_ = ax[0].xaxis.set_ticklabels(Labels)
_ = ax[0].yaxis.set_ticklabels(Labels)
Confusion_Matrix = Confusion_Matrix.astype('float') / Confusion_Matrix.sum(axis=1)[:, np.newaxis]
_ = sns.heatmap(Confusion_Matrix, annot=True, annot_kws={"size": 14}, cmap="Greens", ax = ax[1],
linewidths = 0.2, vmin=0, vmax=1, cbar_kws={"shrink": 1})
_ = ax[1].set_xlabel('Predicted labels')
_ = ax[1].set_ylabel('True labels');
_ = ax[1].set_title('Normalized Confusion Matrix');
_ = ax[1].xaxis.set_ticklabels(Labels)
_ = ax[1].yaxis.set_ticklabels(Labels)